Skip to content

[HZ-5410] PNCounter for Asyncio#804

Merged
yuce merged 4 commits intohazelcast:masterfrom
yuce:asyncio-pncounter
May 8, 2026
Merged

[HZ-5410] PNCounter for Asyncio#804
yuce merged 4 commits intohazelcast:masterfrom
yuce:asyncio-pncounter

Conversation

@yuce
Copy link
Copy Markdown
Contributor

@yuce yuce commented Apr 22, 2026

Straightforward port of PNCounter proxy and its test to the asyncio client.

Proxy:

Tests:

Diff:

1c1
< import functools
---
> import asyncio
5,7c5,7
< from hazelcast.future import Future
< from hazelcast.proxy.base import Proxy
< from hazelcast.cluster import VectorClock
---
> from hazelcast.errors import NoDataMemberInClusterError
> from hazelcast.internal.asyncio_cluster import VectorClock
> from hazelcast.internal.asyncio_proxy.base import Proxy
9d8
<     pn_counter_add_codec,
10a10
>     pn_counter_add_codec,
13d12
< from hazelcast.errors import NoDataMemberInClusterError
18c17
< class PNCounter(Proxy["BlockingPNCounter"]):
---
> class PNCounter(Proxy):
63c62
<     def get(self) -> Future[int]:
---
>     async def get(self) -> int:
74c73
<         return self._invoke_internal(pn_counter_get_codec)
---
>         return await self._invoke_internal(pn_counter_get_codec)
76c75
<     def get_and_add(self, delta: int) -> Future[int]:
---
>     async def get_and_add(self, delta: int) -> int:
92c91,93
<         return self._invoke_internal(pn_counter_add_codec, delta=delta, get_before_update=True)
---
>         return await self._invoke_internal(
>             pn_counter_add_codec, delta=delta, get_before_update=True
>         )
94c95
<     def add_and_get(self, delta: int) -> Future[int]:
---
>     async def add_and_get(self, delta: int) -> int:
110c111,113
<         return self._invoke_internal(pn_counter_add_codec, delta=delta, get_before_update=False)
---
>         return await self._invoke_internal(
>             pn_counter_add_codec, delta=delta, get_before_update=False
>         )
112c115
<     def get_and_subtract(self, delta: int) -> Future[int]:
---
>     async def get_and_subtract(self, delta: int) -> int:
128c131,133
<         return self._invoke_internal(pn_counter_add_codec, delta=-1 * delta, get_before_update=True)
---
>         return await self._invoke_internal(
>             pn_counter_add_codec, delta=-1 * delta, get_before_update=True
>         )
130c135
<     def subtract_and_get(self, delta: int) -> Future[int]:
---
>     async def subtract_and_get(self, delta: int) -> int:
146c151
<         return self._invoke_internal(
---
>         return await self._invoke_internal(
150c155
<     def get_and_decrement(self) -> Future[int]:
---
>     async def get_and_decrement(self) -> int:
162c167
<         return self._invoke_internal(pn_counter_add_codec, delta=-1, get_before_update=True)
---
>         return await self._invoke_internal(pn_counter_add_codec, delta=-1, get_before_update=True)
164c169
<     def decrement_and_get(self) -> Future[int]:
---
>     async def decrement_and_get(self) -> int:
176c181
<         return self._invoke_internal(pn_counter_add_codec, delta=-1, get_before_update=False)
---
>         return await self._invoke_internal(pn_counter_add_codec, delta=-1, get_before_update=False)
178c183
<     def get_and_increment(self) -> Future[int]:
---
>     async def get_and_increment(self) -> int:
190c195
<         return self._invoke_internal(pn_counter_add_codec, delta=1, get_before_update=True)
---
>         return await self._invoke_internal(pn_counter_add_codec, delta=1, get_before_update=True)
192c197
<     def increment_and_get(self) -> Future[int]:
---
>     async def increment_and_get(self) -> int:
204c209
<         return self._invoke_internal(pn_counter_add_codec, delta=1, get_before_update=False)
---
>         return await self._invoke_internal(pn_counter_add_codec, delta=1, get_before_update=False)
216,222c221,224
<     def blocking(self) -> "BlockingPNCounter":
<         return BlockingPNCounter(self)
< 
<     def _invoke_internal(self, codec, **kwargs):
<         delegated_future = Future()
<         self._set_result_or_error(delegated_future, [], None, codec, **kwargs)
<         return delegated_future
---
>     async def _invoke_internal(self, codec, **kwargs) -> int:
>         delegated_future = asyncio.get_running_loop().create_future()
>         await self._set_result_or_error(delegated_future, [], None, codec, **kwargs)
>         return await delegated_future
224c226
<     def _set_result_or_error(
---
>     async def _set_result_or_error(
227c229
<         target = self._get_crdt_operation_target(excluded_addresses)
---
>         target = await self._get_crdt_operation_target(excluded_addresses)
246,260d247
<         future = self._invoke_on_target(request, target.uuid, codec.decode_response)
< 
<         checker_func = functools.partial(
<             self._check_invocation_result,
<             delegated_future=delegated_future,
<             excluded_addresses=excluded_addresses,
<             target=target,
<             codec=codec,
<             **kwargs
<         )
<         future.add_done_callback(checker_func)
< 
<     def _check_invocation_result(
<         self, future, delegated_future, excluded_addresses, target, codec, **kwargs
<     ):
262c249
<             result = future.result()
---
>             result = await self._ainvoke_on_target(request, target.uuid, codec.decode_response)
272c259,261
<             self._set_result_or_error(delegated_future, excluded_addresses, ex, codec, **kwargs)
---
>             await self._set_result_or_error(
>                 delegated_future, excluded_addresses, ex, codec, **kwargs
>             )
274c263
<     def _get_crdt_operation_target(self, excluded_addresses):
---
>     async def _get_crdt_operation_target(self, excluded_addresses):
281c270
<         self._current_target_replica_address = self._choose_target_replica(excluded_addresses)
---
>         self._current_target_replica_address = await self._choose_target_replica(excluded_addresses)
284,285c273,274
<     def _choose_target_replica(self, excluded_addresses):
<         replica_addresses = self._get_replica_addresses(excluded_addresses)
---
>     async def _choose_target_replica(self, excluded_addresses):
>         replica_addresses = await self._get_replica_addresses(excluded_addresses)
293c282
<     def _get_replica_addresses(self, excluded_addresses):
---
>     async def _get_replica_addresses(self, excluded_addresses):
297c286
<         replica_count = self._get_max_configured_replica_count()
---
>         replica_count = await self._get_max_configured_replica_count()
309c298
<     def _get_max_configured_replica_count(self):
---
>     async def _get_max_configured_replica_count(self):
314c303
<         count = self._invoke(
---
>         count = await self._invoke(
316c305
<         ).result()
---
>         )
325c314,315
<     def _to_vector_clock(self, timestamps):
---
>     @classmethod
>     def _to_vector_clock(cls, timestamps):
333,402c323,324
< class BlockingPNCounter(PNCounter):
<     __slots__ = ("_wrapped", "name", "service_name")
< 
<     def __init__(self, wrapped: PNCounter):
<         self.name = wrapped.name
<         self.service_name = wrapped.service_name
<         self._wrapped = wrapped
< 
<     def get(  # type: ignore[override]
<         self,
<     ) -> int:
<         return self._wrapped.get().result()
< 
<     def get_and_add(  # type: ignore[override]
<         self,
<         delta: int,
<     ) -> int:
<         return self._wrapped.get_and_add(delta).result()
< 
<     def add_and_get(  # type: ignore[override]
<         self,
<         delta: int,
<     ) -> int:
<         return self._wrapped.add_and_get(delta).result()
< 
<     def get_and_subtract(  # type: ignore[override]
<         self,
<         delta: int,
<     ) -> int:
<         return self._wrapped.get_and_subtract(delta).result()
< 
<     def subtract_and_get(  # type: ignore[override]
<         self,
<         delta: int,
<     ) -> int:
<         return self._wrapped.subtract_and_get(delta).result()
< 
<     def get_and_decrement(  # type: ignore[override]
<         self,
<     ) -> int:
<         return self._wrapped.get_and_decrement().result()
< 
<     def decrement_and_get(  # type: ignore[override]
<         self,
<     ) -> int:
<         return self._wrapped.decrement_and_get().result()
< 
<     def get_and_increment(  # type: ignore[override]
<         self,
<     ) -> int:
<         return self._wrapped.get_and_increment().result()
< 
<     def increment_and_get(  # type: ignore[override]
<         self,
<     ) -> int:
<         return self._wrapped.increment_and_get().result()
< 
<     def reset(  # type: ignore[override]
<         self,
<     ) -> None:
<         self._wrapped.reset()
< 
<     def destroy(self) -> bool:
<         return self._wrapped.destroy()
< 
<     def blocking(self) -> "BlockingPNCounter":
<         return self
< 
<     def __repr__(self) -> str:
<         return self._wrapped.__repr__()
---
> async def create_pn_counter_proxy(service_name, name, context):
>     return PNCounter(service_name, name, context)

@yuce yuce changed the title PNCounter for Asyncio [HZ-5410] PNCounter for Asyncio Apr 22, 2026
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Apr 22, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 94.38%. Comparing base (319a20a) to head (b6d2993).

Additional details and impacted files
@@            Coverage Diff             @@
##           master     #804      +/-   ##
==========================================
+ Coverage   94.31%   94.38%   +0.06%     
==========================================
  Files         402      403       +1     
  Lines       26383    26489     +106     
==========================================
+ Hits        24884    25001     +117     
+ Misses       1499     1488      -11     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@yuce yuce requested a review from emreyigit April 22, 2026 08:57
@yuce yuce requested a review from gbarnett-hz May 5, 2026 09:05
Copy link
Copy Markdown
Member

@emreyigit emreyigit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just left a small comment. Rest is LGTM.

Comment thread hazelcast/asyncio/__init__.py Outdated
from hazelcast.internal.asyncio_proxy.pn_counter import PNCounter
from hazelcast.internal.asyncio_proxy.queue import Queue
from hazelcast.internal.asyncio_proxy.replicated_map import ReplicatedMap
from hazelcast.internal.asyncio_proxy.replicated_map import ReplicatedMap
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems redundant.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed at: b6d2993

yuce added 2 commits May 8, 2026 16:41
# Conflicts:
#	hazelcast/internal/asyncio_client.py
#	hazelcast/internal/asyncio_proxy/manager.py
@yuce yuce merged commit d550a6e into hazelcast:master May 8, 2026
11 checks passed
@yuce yuce deleted the asyncio-pncounter branch May 8, 2026 14:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants